package area

import java.util
import java.util.Properties

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.hadoop.hive.ql.exec.UDAF
import org.apache.spark.{SparkConf, SparkContext, sql}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import sessionanalyze.conf.ConfigurationManager
import sessionanalyze.constant.Constants
import sessionanalyze.dao.factory.DAOFactory
import sessionanalyze.domain.AreaTop3Product
import sessionanalyze.test.MockData
import sessionanalyze.util.ParamUtils

/*
                    .::::.
                  .::::::::.
                 :::::::::::
             ..:::::::::::'	  FUCK YOU
           '::::::::::::'		Goddess bless, never BUG
             .::::::::::
        '::::::::::::::..
             ..::::::::::::.
           ``::::::::::::::::
            ::::``:::::::::'        .:::.
           ::::'   ':::::'       .::::::::.
         .::::'      ::::     .:::::::'::::.
        .:::'       :::::  .:::::::::' ':::::.
       .::'        :::::.:::::::::'      ':::::.
      .::'         ::::::::::::::'         ``::::.
  ...:::           ::::::::::::'              ``::.
 ```` ':.          ':::::::::'                  ::::..
                    '.:::::'                    ':'````..
                    
 ━━━━━━━━━━━━━━━━━━━━ 女神保佑,永无BUG ━━━━━━━━━━━━━━━━━━━━
*/
object AreaTop3Product {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(Constants.SPARK_APP_NAME_PAGE).setMaster("local")
    val sc = new SparkContext(conf)
    val sparkSession = SparkSession.builder().appName(Constants.SPARK_APP_NAME_SESSION).getOrCreate()
    import sparkSession.sqlContext.implicits._

    //生成模拟数据
    MockData.mock(sc,sparkSession)

    val taskDAO = DAOFactory.getTaskDAO
    //获取TaskId
    val taskId = ParamUtils.getTaskIdFromArgs(args,Constants.SPARK_LOCAL_TASKID_PAGE)
    //得到 Task 对象
    val task = taskDAO.findById(taskId)
    if(task == null){
      println("没有获取到对应的Task信息")
      return
    }
    //解析当前任务的参数信息
    val taskParam = JSON.parseObject(task.getTaskParam)
    //取某个时间范围内的有效数据
    val actionDF:DataFrame = getActionDFByDateRange(sparkSession,taskParam)
    println("actionDF数据条数：" + actionDF.count())

    //获取城市信息
    val cityInfoDF:DataFrame = getCityInfoDF(sparkSession)
    println("cityInfoDF数据条数：" + cityInfoDF.count())
    //使用【usingColumn】形式不会产生重复列；使用【actionDF("city_id")===cityInfoDF("city_id")】会产生两列city_id
    val joined: DataFrame = actionDF.join(cityInfoDF,"city_id")

    //创建临时表
    joined.createTempView("cityInfoProduct")
    //对area打标记
    setAreaFlag(sparkSession,"cityInfoProduct").createTempView("AreaFlag")
    sparkSession.sql("select * from AreaFlag")

    //自定义UDF拼接城市id与城市名，格式（id:城市名）
    sparkSession.udf.register("cityInfo",(cityId:Long,cityName:String)=> {cityId + ":" + cityName})
    sparkSession.udf.register("cityInfos",new CityInfoSUDAF)
    val areaCountDF: DataFrame = sparkSession.sql("select area,click_product_id,sum(1) as click_count,cityInfos(cityInfo(city_id,city_name)) as city_infos from AreaFlag group by area,click_product_id ")
    areaCountDF.createTempView("areaCount")
    sparkSession.sql("select * from areaCount").show()

    //join商品明细表，判断商品属于自营还是第三方
//    sparkSession.udf.register("getProductStatus",(status:String)=>{if(status=="0")return "Self" else return "Third party"})
    val sql = "select area,product_id,click_count,city_infos,product_name,if(get_json_object(extend_info,'$.product_status')='0','Self','Third party') product_status from areaCount join product_info on areaCount.click_product_id=product_info.product_id"
    sparkSession.sql(sql).createTempView("productStatus")
    setAreaFlag(sparkSession,"productStatus").createTempView("productStatusFlag")
    //窗口函数求Top3
    val sql2 = "select * from (select *,row_number() over (partition by area order by click_count) as row from productStatusFlag )as tmp where row<=3"
    val df: DataFrame = sparkSession.sql(sql2)
    insertTop3(taskId,df)


  }
  def insertTop3(taskId: Long, frame: sql.DataFrame): Unit ={
    val areaTop3ProductDAO = DAOFactory.getAreaTop3ProductDAO

    frame.rdd.foreachPartition(x =>{
      val list = new util.ArrayList[AreaTop3Product]()
      x.foreach(row => {
        val data = new AreaTop3Product
        data.setTaskid(taskId)
        data.setArea(row.getString(0))
        data.setAreaLevel(row.getString(6))
        data.setCityInfos(row.getString(3))
        data.setClickCount(row.getLong(2))
        data.setProductid(row.getLong(1))
        data.setProductName(row.getString(4))
        data.setProductStatus(row.getString(5))
        list.add(data)
      })
      areaTop3ProductDAO.insertBatch(list)
    })
    println("插入数据表AreaTop3Product 完成")
  }

  def setAreaFlag(sparkSession: SparkSession,tableName:String): DataFrame ={
    //通过case when 函数对area打标记
    val sql = "select *,case when area='华北' or area='华东' then 'A级' " +
                        "when area='华南' or area='华中' then 'B级' " +
                        "when area='西北' or area='西南' then 'C级' " +
                        "when area='东北' then 'C级' end as area_level " +
                        "from " + tableName
//    val areaFlagDF = sparkSession.sql("select *,case when area='华北' or area='华东' then 'A级' when area='华南' or area='华中' then 'B级' when area='西北' or area='西南' then 'C级' when area='东北' then 'C级' end as area_level from ctiyInfoProduct")
    val areaFlagDF: DataFrame = sparkSession.sql(sql)
    areaFlagDF
  }

  def getCityInfoDF(sparkSession: SparkSession) ={
    //用来存储配置信息
    val props = new Properties()
    props.put("user",ConfigurationManager.getProperty(Constants.JDBC_USER))

    props.put("password",ConfigurationManager.getProperty(Constants.JDBC_PASSWORD))

    props.put(Constants.JDBC_DRIVER,ConfigurationManager.getProperty(Constants.JDBC_DRIVER))
    val url = ConfigurationManager.getProperty(Constants.JDBC_URL)
    sparkSession.read.jdbc(url,"city_info",props).createTempView("city_info")
    val sql = "select * from city_info"
    val cityInfoDF: DataFrame = sparkSession.sql(sql)

    cityInfoDF
  }

  def getActionDFByDateRange(sparkSession: SparkSession, taskParam: JSONObject): DataFrame ={
    //解析时间范围
    val startDate = ParamUtils.getParam(taskParam,Constants.PARAM_START_DATE)
    val endDate = ParamUtils.getParam(taskParam,Constants.PARAM_END_DATE)

    val sql = s"select * from user_visit_action where Date >= '$startDate' and Date <= '$endDate'"
    val actionDF: DataFrame = sparkSession.sql(sql)
    //只获取需返回的字段(city_id,click_product_id),同时过滤掉click_product_id为null的记录
    actionDF.select("city_id","click_product_id").where("click_product_id is not null")
  }
}
